Cubic spline interpolation exercise

scientific computing
numerical methods
Large language models (predictably) learn to represent the semantic meaning of sentences.
Author

Augustas Macijauskas

Published

April 13, 2024

Cubic spline interpolation exercise local

Imports

Toggle cells below if you want to see what imports are being made.

Code
%load_ext autoreload
%autoreload 2
Code
import numpy as np
import plotly.graph_objects as go
from scipy.interpolate import CubicSpline

# Ensures we can render plotly plots with quarto
import plotly.io as pio
pio.renderers.default = "plotly_mimetype+notebook_connected"

Data

import json

with open("hr-data.json", "r") as file:
    data_raw = json.load(file)

len(data_raw["dataOriginal"])
4248
from datetime import datetime

def get_seconds_since_beginning_of_day(date_string: str):
    date = datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S.000Z")
    return date.hour * 3600 + date.minute * 60 + date.second


data = [{**item, "time": get_seconds_since_beginning_of_day(item["time"])} for item in data_raw["dataOriginal"]]
data = data[:-1]  # drop last point since it overlaps with the next day
data[:5]
[{'value': 78, 'time': 15},
 {'value': 58, 'time': 30},
 {'value': 58, 'time': 45},
 {'value': 58, 'time': 60},
 {'value': 58, 'time': 75}]

Cubic spline interpolation with scipy

x_train = [item["time"] for item in data]
y_train = [item["value"] for item in data]

cs = CubicSpline(x_train, y_train, bc_type="natural", extrapolate=True)

x_test = np.arange(24 * 3600)
y_test = cs(x_test)
# Create a figure
fig = go.Figure()

# Add scatter plot
fig.add_trace(go.Scatter(x=x_train, y=y_train, mode='markers', name='Original points'))

# Add line plot
fig.add_trace(go.Scatter(x=x_test, y=y_test, mode='lines', name='Fitted line'))

# Add titles and labels
fig.update_layout(
    title='Scipy interpolation',
    xaxis_title='time',
    yaxis_title='heart rate'
)

# Show the plot
fig.show()

Custom implementation of cubic spline interpolation

def tridiagonal_linear_system_solver(d_lower, d_main, d_upper, b):
    """Solve a tridiagonal linear system given the main, lower, and upper diagonals, as well as the vector b"""

    n = len(d_main)

    # Forward sweep
    for i in range(1, n):
        w = d_lower[i - 1] / d_main[i - 1]
        d_main[i] -= w * d_upper[i - 1]
        b[i] -= w * b[i - 1]

    # Back substitution
    x = np.zeros(n)
    x[-1] = b[-1] / d_main[-1]
    for i in range(n - 2, -1, -1):
        x[i] = (b[i] - d_upper[i] * x[i + 1]) / d_main[i]

    return x


class CubicSplineCustom:

    def __init__(self, x, y):
        if not np.all(np.diff(x) > 0):
            raise ValueError("x values must be in ascending order.")

        self.x = np.array(x)
        self.y = np.array(y)
        self._find_coeffs()

    def _find_coeffs(self):
        # Find the relevant diagonals and from self.x and self.y assuming natural boundary conditions
        d_lower, d_main, d_upper, b, h = self._compute_diagonals_and_b()

        # Compute the M_i's for i = 1, n-1 (since M_0 and M_n are assumed to be 0). This will require solving a
        # tridiagonal linear system
        ms = tridiagonal_linear_system_solver(d_lower, d_main, d_upper, b)

        # Compute the coefficients of the cubic polynomials
        self.c = self._compute_coefficients_from_second_derivatives(ms, h)

    def _compute_diagonals_and_b(self):
        x, y = self.x, self.y
        h = np.diff(x)

        # Compute the diagonals for the tridiagonal matrix
        d_lower = h.copy()
        d_lower[-1] = 0  # one of the naturals BCs
        d_upper = h.copy()
        d_upper[0] = 0

        d_main = 2 * np.ones(len(x))
        d_main[1:-1] *= h[:-1] + h[1:]

        b = np.zeros(len(x))
        y_diff = np.diff(y)
        b[1:-1] = 6 * (y_diff[1:] / h[1:] - y_diff[:-1] / h[:-1])

        return d_lower, d_main, d_upper, b, h

    def _compute_coefficients_from_second_derivatives(self, ms, h):
        coeffs = np.zeros((4, len(self.x) - 1))
        coeffs[0, :] = (ms[1:] - ms[:-1]) / (6 * h)
        coeffs[1, :] = ms[:-1] / 2
        coeffs[2, :] = (self.y[1:] - self.y[:-1]) / h - (ms[1:] + 2 * ms[:-1]) * h / 6
        coeffs[3, :] = self.y[:-1]

        return coeffs

    def _get_index(self, x):
        """Performs binary search"""
        low, high = 0, len(self.x) - 1

        while low < high:
            mid = (low + high) // 2
            if self.x[mid] <= x:
                low = mid + 1
            else:
                high = mid

        return min(max(low - 1, 0), len(self.x) - 2)

    def interpolate(self, x: float):
        # Find which polynomial is appropriate and evaluate at x
        idx = self._get_index(x)
        dx = x - self.x[idx]
        return (self.c[0, idx] * dx + self.c[1, idx]) * dx**2 + self.c[2, idx] * dx + self.c[3, idx]


# Example usage:
custom_spline = CubicSplineCustom(x_train, y_train)
print(custom_spline.c.shape)
print(custom_spline.interpolate(5.5))  # Evaluate at x = 5.5
(4, 4246)
92.69934779516517
assert cs.c.shape == custom_spline.c.shape
assert np.allclose(cs.c, custom_spline.c)
np.sqrt(((cs.c - custom_spline.c) ** 2).mean())
1.1509499204046055e-17
assert np.allclose(custom_spline.interpolate(-1), cs(-1))
assert np.allclose(custom_spline.interpolate(86500), cs(86500))
# Create a figure
fig = go.Figure()

# Add scatter plot
fig.add_trace(go.Scatter(x=x_train, y=y_train, mode="markers", name="Original data"))

# Add line plot
fig.add_trace(go.Scatter(x=x_test, y=y_test, mode="lines", name="Fitted line"))

y_test_custom = [custom_spline.interpolate(x) for x in x_test]
assert np.allclose(y_test, y_test_custom)
fig.add_trace(
    go.Scatter(x=x_test, y=y_test_custom, mode="lines", name="Fitted line (custom)", line=dict(dash="longdash"))
)

# Add titles and labels
fig.update_layout(title="Scipy and custom interpolations", xaxis_title="time", yaxis_title="heart rate")

# Show the plot
fig.show()